热门标签 | HotTags
当前位置:  开发笔记 > 大数据 > 正文

|NO.Z.00100|——|大数据技术|——|Hadoop与KafkaV07|——|KafkaV07源码解析|——|生产者与消费者流程详解V03|

本文深入解析了KafkaV07的源代码,详细阐述了生产者与消费者的流程机制。通过具体示例和代码分析,帮助读者全面理解Kafka的数据传输和处理过程,为实际应用提供理论支持和技术指导。



[BigDataHadoop:Hadoop&kafka.V07]                                                                          [BigDataHadoop.kafka][|章节四|Hadoop生态圈技术栈|kafka|源码剖析|Kafka源码剖析之Producer消费者流程|]





一、自动提交

### --- 自动提交
~~~ 最简单的提交方式是让悄费者自动提交偏移量。
~~~ 如果enable.auto.commit被设为 true,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。
~~~ 提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。
~~~ 与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。
~~~ 消费者每次在进行轮询时会检查是否该提交偏移量了,
~~~ 如果是,那 么就会提交从上一次轮询返回的偏移量。
~~~ 不过,这种简便的方式也会带来一些问题,

### --- 来看一下下面的例子:
~~~ 假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,
~~~ 再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。
~~~ 这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。
~~~ 可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,
~~~ 不过这种情况是无也完全避免的

二、手动提交


### --- 同步提交
~~~ 取消自动提交,把 auto.commit.offset 设为 false,让应用程序决定何时提交 偏 移量。
~~~ 使用commitSync() 提交偏移量最简单也最可靠。
~~~ 这个 API会提交由 poll() 方法返回 的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常

while (true) {
// 消息拉取
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
}
// 处理完成单次消息以后,提交当前的offset,如果提交失败就抛出异常
consumer.commitSync();
}

### --- 异步提交
~~~ 同步提交有一个不足之处,
~~~ 在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。
~~~ 我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡, 会增加重复消息的数量。
~~~ 这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker的响应。

while (true) {
// 消息拉取
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}

// 异步提交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
});
}



===============================END===============================



Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart                                                                                                                                                   ——W.S.Landor



来自为知笔记(Wiz)



推荐阅读
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • 乐观锁_乐观锁 VS 悲观锁
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了乐观锁VS悲观锁相关的知识,希望对你有一定的参考价值。 ... [详细]
  • FileBeat + Flume + Kafka + HDFS + Neo4j + SparkStreaming + MySQL:【案例】三度关系推荐V1.0版本11:每周一计算最近一月主播视频评级
    一、数据计算步骤汇总下面我们通过文字梳理一下具体的数据计算步骤。第一步:历史粉丝关注数据初始化第二步:实时维护粉丝关注数据第三步:每天定 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
  • Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及流式消费变化数据的能力。应用场景近实时数据摄取Hudi支持插入、更新和删除数据的能力。您 ... [详细]
  • 深入解析队列机制及其广泛的应用场景
    本文深入探讨了队列机制的核心原理及其在多种应用场景中的广泛应用。首先,文章详细解析了队列的基本概念、操作方法及其时间复杂度。接着,通过具体实例,阐述了队列在操作系统任务调度、网络通信、事件处理等领域的实际应用。此外,文章还对比了队列与其他常见数据结构(如栈、数组和链表)的优缺点,帮助读者更好地理解和选择合适的数据结构。最后,通过具体的编程示例,进一步巩固了对队列机制的理解和应用。 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • 字节Java高级岗:java开发cpu吃多线程吗
    前言抱着侥幸心理投了字节跳动后台JAVA开发岗,居然收到通知去面试,一面下整个人来都是懵逼的,不知道我对着面试官都说了些啥(捂脸~~)。侥幸一面居然过了,三天后接到二面通知,结果这 ... [详细]
  • 又给自己挖了一个坑跳进去。KafkaManager使用单例模型获取到一个producer,然而自己代码里用的时候加了一个using然后自己在做测试的时候,for循环加10条数据发送 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • SpringCloud之Bus(消息总线)
    说明:关于SpringCloud系列的文章中的代码都在码云上面地址:https:gitee.comzh_0209_javaspringcloud-ali ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
  • 基于2.1.0构造函数初始化accumulator,这是一个发送的缓冲队列管理器this.accumulatornewRecordAccumulator(logContext,co ... [详细]
  • Alibaba珍藏版mybatis手写文档,值得一读!
    一面问题:MySQLRedisKafka线程算法mysql知道哪些存储引擎,它们的区别mysql索引在什么情况下会失效mysql在项目中的优化场景&# ... [详细]
author-avatar
总会有办法的
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有